/*

Copyright (C) SYSTAP, LLC DBA Blazegraph 2006-2016.  All rights reserved.

Contact:
     SYSTAP, LLC DBA Blazegraph
     2501 Calvert ST NW #106
     Washington, DC 20008
     licenses@blazegraph.com

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

*/
package com.bigdata.sparse;

import java.io.UnsupportedEncodingException;
import java.util.Date;

import com.bigdata.btree.keys.IKeyBuilder;
import com.bigdata.btree.keys.KeyBuilder;
import com.bigdata.util.BytesUtil;

/**
 * A utility class that decodes a key in a {@link SparseRowStore} into the
 * {@link KeyType} for the primary key, the column name, and the timestamp. Note
 * that the exact schema name itself is not recoverable since it is encoded
 * using a non-reversible algorithm (it is a sort key generated by a Unicode
 * collator). Likewise, the primary key can be decoded for primitive data types,
 * but while we can identify the bytes corresponding to the primary key for a
 * Unicode {@link KeyType} we can not decode them (it is also a sort key
 * generated by a Unicode collator). The column name is NOT stored with Unicode
 * compression so that we can decode it without loss (it is encoded into bytes
 * using UTF-8 and those bytes are written directly into the key). This means
 * that column names are NOT ordered according to the Unicode collator. In
 * practice this is not a problem since we never assume order for that part of
 * the key. The {@link SparseRowStore} only relies on {columnName,timestamp}
 * defining the semantics of distinct keys for a given {schema,primaryKey}
 * prefix.
 * <p>
 * The encoded schema name is followed by the {@link KeyType#getByteCode()} and
 * then by a <code>nul</code> byte. By searching for the <code>nul</code> byte
 * we can identify the end of the encoded schema name and also the data type of
 * the primary key. Most kinds of primary keys have a fixed length encoding,
 * e.g., {@link Long}, {@link Double}, etc.
 * <p>
 * Unicode primary keys have a variable length encoding which makes life more
 * complex. For Unicode primary keys, we break with the collation order and use
 * the UTF8 encoding of the key. This means that the primary key can be decoded
 * and preserves hierarchical namespace clustering within the row store but does
 * not impose a total sort order per Unicode sort key semantics. The only
 * reasonable approach is to append a byte sequence to the key that never occurs
 * within the generated Unicode sort keys. Again, we use a <code>nul</code> byte
 * to mark the end of the Unicode primary key since it is not emitted by most
 * Unicode collation implementations as it would cause grief for C-language
 * strings. (However, see SparseRowStore.Options#PRIMARY_KEY_UNICODE_CLEAN} for
 * information on backward compatibility.)
 * 
 * @see Schema#fromKey(IKeyBuilder, Object)
 * @see KeyType#getKeyType(byte)
 * @see AtomicRowWriteRead
 * @see AtomicRowRead
 * 
 * @author <a href="mailto:thompsonbry@users.sourceforge.net">Bryan Thompson</a>
 * @version $Id$
 * 
 * @todo The key is now 100% decodable. The package should be updated to take
 *       advantage of that.
 */
public class KeyDecoder {

    /**
     * The key that was specified to the ctor.
     */
    private final byte[] key;
    
    /**
     * The #of bytes in the encoded schema name (does not include either the
     * byte encoding the {@link KeyType} of the primary key or the
     * <code>nul</code> byte that terminates the schema component in the key).
     */
    private final int schemaBytesLength;

    /**
     * Offset of the byte that encoded the {@link KeyType} for the primary key.
     * This is basically part of the schema component of the key by it is
     * distinct from the bytes returned by {@link Schema#getSchemaBytes()}.
     */
    private final int primaryKeyTypeOffset;

    /**
     * The offset of the first byte in the encoded primary key.
     */
    private final int primaryKeyOffset;

    /**
     * The #of bytes in the encoded primary key (does not include the
     * <code>nul</code> byte that terminates variable length primary keys).
     */
    private final int primaryKeyLength;
    
    /**
     * The decoded value of the primary key -or- <code>null</code> if it is
     * not possible to decode the {@link KeyType}.
     */
    private final Object primaryKey;
    
    /**
     * The offset of the first byte in the encoded column name.
     */
    private final int columnNameOffset;

    /**
     * The #of bytes in the encoded column name (does not include the
     * <code>nul</code> byte that terminates the column name).
     */
    private final int columnNameLength;
    
    /**
     * The offset of the first byte in the timestamp (it is always 8 bytes
     * long).
     */
    private final int timestampOffset;
    
    /**
     * The decoded {@link KeyType} for the primary key.
     */
    private final KeyType primaryKeyType;
    
    /**
     * The decoded column name.
     */
    private final String col;

    /**
     * The decoded timestamp on the column value.
     */
    public final long timestamp;

    /**
     * The bytes from the key that represent the encoded name of the
     * {@link Schema}.
     */
    public byte[] getSchemaBytes() {

        final byte[] a = new byte[schemaBytesLength];
        
        System.arraycopy(key, 0, a, 0, schemaBytesLength);
        
        return a;
        
    }

    /**
     * Return the schema name.
     * 
     * @throws UnsupportedOperationException
     *             unless {@link SparseRowStore#schemaNameUnicodeClean} is
     *             <code>true</code>.
     */
    public String getSchemaName() {
        
        if(!SparseRowStore.schemaNameUnicodeClean)
            throw new UnsupportedOperationException();
        
        return new String(getSchemaBytes());
        
    }
    
    /**
     * The decoded {@link KeyType} for the primary key.
     */
    public final KeyType getPrimaryKeyType() {
        
        return primaryKeyType;
        
    }
    
    /**
     * The decoded primary key.
     * 
     * @throws UnsupportedOperationException
     *             if the primary key can not be decoded.
     */
    public Object getPrimaryKey() {
        
        if(primaryKey == null) {

            throw new UnsupportedOperationException("Can not decode: keyType="
                    + primaryKeyType);

        }
        
        return primaryKey;
        
    }
    
    /**
     * The decoded column name.
     */
    public final String getColumnName() {
        
        return col;
        
    }

    /**
     * The decoded timestamp on the column value. The semantics of the
     * timestamp depend entirely on the application. When the application
     * provides timestamps, they are application defined long integers. When
     * the application requests auto-timestamps, they are generated by the
     * data service.
     */
    public long getTimestamp() {

        return timestamp;

    }

    public KeyDecoder(final byte[] key) {

        if (key == null) {

            throw new IllegalArgumentException();

        }

        this.key = key;
        
        /*
         * Find the end of the encoded schema name. This also gives us the type
         * of the primary key and the offset of the primary key.
         * 
         * Note: the KeyType byte occurs after the schema name bytes and before
         * the [nul].
         */
        int primaryKeyOffset = 0;
        {
            
            boolean found = false;
            
            int schemaBytesLength = 0;
            
            for (int i = 0; i < key.length; i++) {

                if (key[i] == (byte) 0) {

                    schemaBytesLength = i - 1;

                    primaryKeyOffset = i + 1;

                    found = true;

                    break;

                }

            }

            if (!found) {

                throw new RuntimeException(
                        "Could not locate the end of the encoded schema name: key="
                                + BytesUtil.toString(key));

            }

            this.schemaBytesLength = schemaBytesLength;
            
            this.primaryKeyTypeOffset = schemaBytesLength;

            /*
             * Note: ArrayIndexOutOfBounds with index==-1 is an indication that
             * the schema name or a Unicode primary key contained embedded nul
             * bytes. This should no longer be possible when using the unicode
             * clean options on the SparseRowStore which encoded those data as
             * UTF8 rather than as Unicode sort keys. Historically, these were
             * encoded as Unicode sort keys. However, the JDK CollatorEnum
             * option does not support compressed Unicode sort keys and embeds
             * nul bytes in its generated sort keys. We rely on nul bytes as
             * boundary markers when decoding the row store keys. The presence
             * of those nul byte within the scheme and and/or the a Unicode
             * primary key was causing the ArrayIndexOutOfBoundsException here.
             */
            this.primaryKeyType = KeyType.getKeyType(KeyBuilder
                    .decodeByte(key[primaryKeyTypeOffset]));
            
        }

        /*
         * Find the end of the primary key. For some key types the primary key
         * has a fixed length and we just skip that many bytes. For Unicode keys
         * we scan to the next [nul] byte.
         */
        {

            if (primaryKeyType.isFixedLength()) {
            
                primaryKeyLength = primaryKeyType.getEncodedLength();
                
                this.primaryKeyOffset = primaryKeyOffset;

                columnNameOffset = primaryKeyOffset + primaryKeyLength;
            
            } else {

                /*
                 * Scan for the next [nul] byte (ASCII).
                 */
                boolean found = false;

                int primaryKeyLength = 0;

                for (int i = primaryKeyOffset; i < key.length; i++) {

                    if (key[i] == (byte) 0) {

                        primaryKeyLength = i - primaryKeyOffset;

                        found = true;

                        break;

                    }

                }

                if (!found) {

                    throw new RuntimeException(
                            "Could not locate the end of the encoded schema name: keyType="
                                    + primaryKeyType + ", key="
                                    + BytesUtil.toString(key));

                }

                this.primaryKeyLength = primaryKeyLength;

                this.primaryKeyOffset = primaryKeyOffset;

                // Note: also skips the [nul] byte terminating the primary
                // key.
                this.columnNameOffset = primaryKeyOffset + primaryKeyLength + 1;
                
            }
            
            switch (primaryKeyType) {
            case Integer:
                primaryKey = KeyBuilder.decodeInt(key, primaryKeyOffset);
                break;
            case Long:
                primaryKey = KeyBuilder.decodeLong(key, primaryKeyOffset);
                break;
            case Double:
                primaryKey = KeyBuilder.decodeDouble(key, primaryKeyOffset);
                break;
            case Float:
                primaryKey = KeyBuilder.decodeFloat(key, primaryKeyOffset);
                break;
            case Unicode:
                if (SparseRowStore.primaryKeyUnicodeClean) {
                    final byte[] bytes = new byte[primaryKeyLength];
                    System.arraycopy(key, primaryKeyOffset, bytes, 0, primaryKeyLength);
                    try {
                        primaryKey = new String(bytes, SparseRowStore.UTF8);
                    } catch (UnsupportedEncodingException ex) {
                        throw new RuntimeException(
                                "Could not decode the primary key"
                                        + ": primaryKeyOffset="
                                        + primaryKeyOffset
                                        + ", primaryKeyLength="
                                        + primaryKeyLength + ", key="
                                        + BytesUtil.toString(key));
                    }
                } else {
                    /*
                     * Note: Decode is not possible for this case.
                     */
                    primaryKey = null;
                }
                break;
            case ASCII:
                primaryKey = KeyBuilder.decodeASCII(key, primaryKeyOffset,
                        primaryKeyLength);
                break;
            case Date:
                primaryKey = new Date(KeyBuilder.decodeLong(key,
                        primaryKeyOffset));
                break;
            default:
                throw new AssertionError("Unknown keyType=" + primaryKeyType);
            }
            
        }
        
        /*
         * Decode the column name. All bytes until the next [nul] are the column
         * name.
         * 
         * Note: The column name is NOT compressed using Unicode compression so
         * that we can decode it without loss.
         */
        {

            boolean found = false;
            
            int columnNameLength = 0;
            
            int timestampOffset = 0;
            
            for (int i = columnNameOffset; i < key.length; i++) {

                if (key[i] == (byte) 0) {

                    columnNameLength = i - columnNameOffset;
                    
                    timestampOffset = i + 1;
                    
                    found = true;
                    
                    break;
                    
                }

            }

            if (!found) {

                /*
                 * Could not unpack the column name from the key!
                 */

                throw new RuntimeException(
                        "Could not locate the end of the column name: keyType="
                                + primaryKeyType + ", columnNameOffset="
                                + columnNameOffset + ", key="
                                + BytesUtil.toString(key));

            }
            
            this.columnNameLength = columnNameLength;
            
            this.timestampOffset = timestampOffset;

            final byte[] bytes = new byte[columnNameLength];

            System.arraycopy(key, columnNameOffset, bytes, 0, columnNameLength);

            try {

                col = new String(bytes, SparseRowStore.UTF8);

            } catch (UnsupportedEncodingException ex) {

                throw new RuntimeException(
                        "Could not decode the column name: keyType="
                                + primaryKeyType + ", columnNameOffset="
                                + columnNameOffset + ", columnNameLength="
                                + columnNameLength + ", key="
                                + BytesUtil.toString(key));

            }

        }
         
        /*
         * Decode the timestamp.
         */
        timestamp = KeyBuilder.decodeLong(key, timestampOffset);

    }

    /**
     * Returns the head of the key corresponding to the encoded schema name, the
     * primary key's {@link KeyType}, and the primary key (including any
     * terminating <code>nul</code> byte).
     * 
     * @return
     */
    public byte[] getPrefix() {
        
        final int n = columnNameOffset;
        
        final byte[] b = new byte[n];
        
        System.arraycopy(key, 0, b, 0, n);
        
        return b;
        
    }

    /**
     * Returns the length of the prefix corresponding to the encoded schema
     * name, the primary key's {@link KeyType}, and the primary key (including
     * any terminating <code>nul</code> byte).
     * 
     * @return
     */
    public int getPrefixLength() {
        
        return columnNameOffset;
        
    }
    
    /**
     * Shows some of the data that is extracted.
     */
    public String toString() {
        
        return "KeyDecoder{"
                + (SparseRowStore.schemaNameUnicodeClean ? "schema="
                        + getSchemaName() + "," : "")//
                + "primaryKeyType="+ primaryKeyType//
                + (SparseRowStore.primaryKeyUnicodeClean ? ",primaryKey="
                        + getPrimaryKey() : "")//
                + ",col=" + col //
                + ",timestamp=" + timestamp //
                + ",key=" + BytesUtil.toString(key) //
                + "}";

    }

}
